0. Background

State machine, the most famous pattern I think, could be not only used in object oriented programming, but also in process oriented one. We could define state for anything, object, process, conclusion, data, etc… Just let me introduce one scenario for state machine usage, an useful one to deal with the multiple thread object.

Just as usual, we start with code.

Supposing we are designing a data fetcher to get data from web server/local server… anything, through any protocol, named ‘ZPayloadFetcher’

// ZPayloadFetcher.h

#ifndef _Z_PAYLOAD_FETCHER_
#define _Z_PAYLOAD_FETCHER_
#include <inttypes.h>
#include <stdio.h>
#include <unistd.h>

#define TRACE(fmt, args...)      \
        printf("[Payload Fetcher]\t " fmt "\n", ## args);

class ZPayloadFetcher
{
public:
	ZPayloadFetcher();
	~ZPayloadFetcher();
	void Initialize();
	void Release();
	int Process();
};

#endif

Here at least 3 common interface is required to be implemented. ‘Initialized’ is dealing with the initialization, such as memory(buffer) allocate, resource init, etc, so the ‘Release’ is the one that would be called when we no longer need the former resources/memory. ‘Process’ is the core one that helps to get all the payload we want. Macro ‘TRACE’ is defined to help print something to debug, or even to prove our code is working here because we do not have actually scenario to test the design. So, uhhhhh, it seems really beautiful without any ‘State Machine’, ‘Multiple thread’, whether are they?

1. Multiple Thread Object

As talked above, this class ‘ZPayloadFetcher’ is designed to fetch data from somewhere else, at least not existed in memory space of current process, so it would takes lots of time if the target payload is big enough when the interface ‘Process’ is called. If current process running ‘ZPayloadFetcher::Process’ with single thread, this process could be blocked and  there would not be any valid response for other input, such as user input, timer, event, ipc call, etc… Of course we could have this ‘Process’ called in a background thread to make this long time function executing asynclly from the main thread, but from this class point of view, we do not know how this interface would be called. To deal with this, it is better to create a work thread and make the multiple thread work inside the class, as one part of it.

Let us extend the class defination

// ZPayloadFetcher.h

#ifndef _Z_PAYLOAD_FETCHER_
#define _Z_PAYLOAD_FETCHER_
#include <inttypes.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

#define TRACE(fmt, args...)      \
        printf("[Payload Fetcher]\t " fmt "\n", ## args);

class ZPayloadFetcher
{
public:
	ZPayloadFetcher();
	~ZPayloadFetcher();
	void Initialize();
	void Release();
	int Process();
private:
	void DoProcess();
	pthread_t ProcessThreadId;
	int Progress;

	static void* ThreadProcess(void* f)
	{
		static_cast<ZPayloadFetcher*>(f)->DoProcess();
		return NULL;
	};
};

#endif

Here ‘pthread’ is used for multiple thread, and we have an internal function ‘DoProcess’ to deal with the actual fetching work, a ‘ProcessThreadId’ to store the thread id of the work thread executing ‘DoProcess’. Then it is the time to implement and verify our design.

// ZPayloadFetcher.cpp

#include "ZPayloadFetcher.h"
#include <stdio.h>
#include <assert.h>
#include <string>
#include <unistd.h>

ZPayloadFetcher::ZPayloadFetcher():
	ProcessThreadId(0),
	Progress(0)
{

};
void ZPayloadFetcher::Initialize()
{
	// Initialize scenario
	// ...
	TRACE("Initialization completed");
}

void ZPayloadFetcher::Release()
{
	// Release scenario
	// ...
	if (ProcessThreadId) {
		TRACE("Waiting for Process thread [0x%X] to complete...", ProcessThreadId);
		pthread_join(ProcessThreadId, NULL);
		TRACE("Thread [0x%X] has completed...", ProcessThreadId);
	}
	TRACE("Release completed.");
}

int ZPayloadFetcher::Process()
{
	pthread_create(&ProcessThreadId, NULL, ThreadProcess, this);
	return 0;
}

void ZPayloadFetcher::DoProcess()
{
	TRACE("Process start...");

	// Fetch process scenario
	// ...

	for (int i = 0; i < 10; i++)
	{
		Progress += 10;
		TRACE("Process in progress, [%2d%] completed", Progress);
		sleep(1);
	}
	TRACE("Process complete");
	Progress = 0;
	return;
}

ZPayloadFetcher::~ZPayloadFetcher()
{
	if (ProcessThreadId) {
		TRACE("Waiting for Process thread [0x%X] to complete...", ProcessThreadId);
		pthread_join(ProcessThreadId, NULL);
		TRACE("Thread [0x%X] has completed...", ProcessThreadId);
	}
}

Here we just create the thread and start the real ‘process’ in back ground thread, interface ‘Process’ returns immediately and deal with no real work to avoid main thread blocked. As a result, we need to wait for the work thread to finish when we ‘Release’ or de-construct this object. ‘Progress’ is to record how much our fetch job has completed, then print. We have a ‘main.cpp’ as follow to verify the code.

// main.cpp

#include "ZPayloadFetcher.h"
#include <sys/select.h>
#include <sys/time.h>
#include <pthread.h>
int Fd;
static void* Input(void*)
{
	char input = '\0';
	while (1)
	{
		printf("Input a char to start event....\n");
   		scanf("%c/n", &input);
		if (input < 'a' || input > 'z')
		{
			printf("Invalid input: [%c]..\n", input);
			continue;
		}
		printf("Event [%c] will be executed\n", input);
		if (-1 == write(Fd, &input, 1)) {
			printf("Error writing pipe, event [%c] terminated\n", input);
		}
	}
}
int main()
{
	int pFd[2];
	struct timeval tv;
	fd_set rfds;
	int ret = 0;
	if (pipe(pFd) == -1)
	{
		printf("Fail to open pipe, terminate.\n");
		return 0;
	}
	Fd = pFd[1];
	FD_ZERO(&rfds);
	FD_SET(pFd[0], &rfds);
	bool isExiting = false;
	// create thread for input
	pthread_t threadId = 0;
	ret = pthread_create(&threadId, NULL, Input, NULL);
	ZPayloadFetcher fetcher;

	while (!isExiting)
	{
		printf("Waiting for event....\n");
		printf("[i] -> Initialize payload fetcher\n");
		printf("[r] -> Release payload fetcher\n");
		printf("[p] -> Process payload fetcher\n");
		printf("[x] -> Exit\n");
		//Block in main thread to wait for event
		select(pFd[0] + 1, &rfds, NULL, NULL, &tv);
		read(pFd[0], &ret, sizeof(ret));
		switch (ret)
		{
			case 'i':
			fetcher.Initialize();
			break;
			case 'r':
			fetcher.Release();
			break;
			case 'p':
			fetcher.Process();
			break;
			case 'x':
			isExiting = true;
			break;
			default:
			printf("Unrecognized event, execution aborted.\n");
		}
	}
	printf("Bye\n");
	return 0;
}

In main, the there is a loop in main thread which is blocked by ‘select’, which could be recognized as a simple event loop. Also there is a work thread to monitor the keyboard input to simulate the user inputs. Different scenario would be go though depends on the input of user according to the print information. Since we are using ‘pthread’, ‘-lpthread’ is required in make file.

### Requires

### Targets
SRC += *.cpp
TARGET += PayloadFetcheDemo

##Build
edit: $(SRC)
g++ $(SRC) -o $(TARGET) -std=c++11 -lpthread

Just make and then execute to verify.

Waiting for event....
[i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
Input a char to start event....
i
Event [i] will be executed
Input a char to start event....
[Payload Fetcher]	 Initialization completed
Waiting for event....
[i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
p
Invalid input: [
]..
Input a char to start event....
Event [p] will be executed
Input a char to start event....
Waiting for event....
[i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
[Payload Fetcher]	 Process start...
[Payload Fetcher]	 Process in progress, [10%] completed
[Payload Fetcher]	 Process in progress, [20%] completed
[Payload Fetcher]	 Process in progress, [30%] completed
x
Invalid input: [
]..
Input a char to start event....
Event [x] will be executed
Input a char to start event....
Bye
[Payload Fetcher]	 Waiting for Process thread [0xC19C5700] to complete...
[Payload Fetcher]	 Process in progress, [40%] completed
[Payload Fetcher]	 Process in progress, [50%] completed
[Payload Fetcher]	 Process in progress, [60%] completed
[Payload Fetcher]	 Process in progress, [70%] completed
[Payload Fetcher]	 Process in progress, [80%] completed
[Payload Fetcher]	 Process in progress, [90%] completed
[Payload Fetcher]	 Process in progress, [100%] completed
[Payload Fetcher]	 Process complete
[Payload Fetcher]	 Thread [0xC19C5700] has completed...

So it works as designed, the process would wait for work thread complete and then quit.

But how about we input a process request before initialization request?Or another process request during the former one is just executing?

Waiting for event....
[i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
Input a char to start event....
p
Event [p] will be executed
Input a char to start event....
Waiting for event....
[i] -> Initialize payload fetcher
[Payload Fetcher]     Process start...
[Payload Fetcher]     Process in progress, [10%] completed
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
[Payload Fetcher]     Process in progress, [20%] completed
[Payload Fetcher]     Process in progress, [30%] completed
p
Invalid input: [
]..
Input a char to start event....
Event [p] will be executed
Input a char to start event....
Waiting for event....
[i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
[Payload Fetcher]     Process start...
[Payload Fetcher]     Process in progress, [40%] completed
[Payload Fetcher]     Process in progress, [50%] completed
[Payload Fetcher]     Process in progress, [60%] completed
[Payload Fetcher]     Process in progress, [70%] completed
[Payload Fetcher]     Process in progress, [80%] completed
[Payload Fetcher]     Process in progress, [90%] completed
[Payload Fetcher]     Process in progress, [100%] completed
[Payload Fetcher]     Process in progress, [110%] completed
[Payload Fetcher]     Process in progress, [120%] completed
[Payload Fetcher]     Process in progress, [130%] completed
[Payload Fetcher]     Process in progress, [140%] completed
[Payload Fetcher]     Process in progress, [150%] completed
[Payload Fetcher]     Process in progress, [160%] completed
[Payload Fetcher]     Process in progress, [170%] completed
[Payload Fetcher]     Process in progress, [180%] completed
[Payload Fetcher]     Process complete
[Payload Fetcher]     Process in progress, [10%] completed
x[Payload Fetcher]     Process in progress, [20%] completed

Invalid input: [
]..
Input a char to start event....
Event [x] will be executed
Input a char to start event....
Bye
[Payload Fetcher]     Waiting for Process thread [0x722C4700] to complete...
[Payload Fetcher]     Process complete
[Payload Fetcher]     Thread [0x722C4700] has completed...

Obviously, we could still process without a initialization scenario, no issue happened because actually we have not use any resource which should be needed during process, but by design this kind of process request should be rejected to avoid unknown issue. Also, according to our design, 2 process request process in parallel is not supported, which means that the second process request should be rejected as the first one is still in progress and not completed yet. Here we could found that the output is not valid because of the shortage of design. How we could distinguish this situation and give the solution? State machine.

2. State Machine Definition.

To tell the truth, according to my experience, normally to deal with the issue described above, we developers prefer to use flags like ‘IsInProgress’, ‘InInitialized’, etc. to distinguish the status and make the logic for each scenario. One bug, one flag, which makes the code more and more unreadable and hard debug. Therefore, from my point of view, a relatively better design is so important at beginning that it could make the code much more easier to maintain and extend. Here comes the the state machine, we may introduce the state machine at the very begin and extend the state when more requirements comes, like parse/validate/…. for the payload fetcher.

so at first we just introduce the state definition as below,

	enum class ZState
	{
		IDLE,
		READY,
		IN_PROGRESS,
		COMPLETED,
		FAILED,
		UNKNOWN
	};

here we just defined several basic states, to identify this where this payload fetcher is, and provide a reference for it to deal with the external requests. Normally, status change relates to operation, which means there is must be operations during the change, so let’s define some operations.

	enum class ZOperation
	{
		INITIALIZE,
		PROCESS_START,
		PROCESS_COMPLETE,
		RELEASE,
		UNKNOWN,
	};

To help debug or trace, I’d like to introduce one trace implemented when state changes.

namespace
{

	// For trace
	const std::string STATES[6]	= {	"IDLE",
					    	"READY",
						"IN_PROGRESS",
						"COMPLETED",
						"FAILED",
						"UNKNOWN" };

	const std::string OPERATIONS[5]	= { "INTIALIZE",
					    "PROCESS START",
					    "PROCESS COMPLETE",
					    "RELEASE",
					    "UNKOWN" };
}
void ZPayloadFetcher::StateChange(ZState nextState, ZOperation operation)
{
	// Assert if the state change is invalid
	// ...

	TRACE("State change: [%s] - > [%s], operation: [%s]",
	       STATES[int(State)].c_str(),
               STATES[int(nextState)].c_str(),
               OPERATIONS[int(operation)].c_str());
	pthread_mutex_lock(&StateChangeMutex);
	State = nextState;
	pthread_mutex_unlock(&StateChangeMutex);
}

So know from the output trace we could exactly get where this payload fetcher is, then it comes to the most important part, we could determine the process logic according to current status when external request comes, here is the completed implementation.

// ZPayloadFetcher.h

#ifndef _Z_PAYLOAD_FETCHER_
#define _Z_PAYLOAD_FETCHER_
#include <inttypes.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

#define TRACE(fmt, args...)      \
        printf("[Payload Fetcher]\t " fmt "\n", ## args);

class ZPayloadFetcher
{
public:
	ZPayloadFetcher();
	~ZPayloadFetcher();
	void Initialize();
	void Release();
	int Process();
private:
	enum class ZState
	{
		IDLE,
		READY,
		IN_PROGRESS,
		COMPLETED,
		FAILED,
		UNKNOWN
	};
	enum class ZOperation
	{
		INITIALIZE,
		PROCESS_START,
		PROCESS_COMPLETE,
		RELEASE,
		UNKNOWN,
	};
	void StateChange(ZState nextState, ZOperation operation);
	void DoProcess();
	ZState State;
	pthread_t ProcessThreadId;
	int Progress;
	pthread_mutex_t StateChangeMutex;
	static void* ThreadProcess(void* f)
	{
		static_cast<ZPayloadFetcher*>(f)->DoProcess();
		return NULL;
	};
};

#endif
// ZPayloadFetcher.cpp

#include "ZPayloadFetcher.h"
#include <stdio.h>
#include <assert.h>
#include <string>
#include <unistd.h>
namespace
{

	// For trace
	const std::string STATES[6]	= {	"IDLE",
					    	"READY",
						"IN_PROGRESS",
						"COMPLETED",
						"FAILED",
						"UNKNOWN" };

	const std::string OPERATIONS[5]	= { "INTIALIZE",
					    "PROCESS START",
					    "PROCESS COMPLETE",
					    "RELEASE",
					    "UNKOWN" };
}

ZPayloadFetcher::ZPayloadFetcher():
	State(ZState::UNKNOWN),
	ProcessThreadId(0),
	Progress(0)
{
	pthread_mutexattr_t attr;
	pthread_mutexattr_init(&attr);
	pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
	pthread_mutex_init(&StateChangeMutex, &attr);
	pthread_mutexattr_destroy(&attr);
};
void ZPayloadFetcher::Initialize()
{
	// Initialize scenario
	// ...
	if (State == ZState::IN_PROGRESS)
	{
		TRACE("Fetcher is in progress, initialize aborted!");
		return;
	}
	TRACE("Initialization completed");
	StateChange(ZState::READY, ZOperation::INITIALIZE);
}

void ZPayloadFetcher::Release()
{
	if (State == ZState::IN_PROGRESS)
	{
		TRACE("Fetcher is in progress, Release aborted!");
		return;
	}
	// Release scenario
	// ...
	if (ProcessThreadId) {
		TRACE("Waiting for Process thread [0x%X] to complete...", ProcessThreadId);
		pthread_join(ProcessThreadId, NULL);
		TRACE("Thread [0x%X] has completed...", ProcessThreadId);
	}
	TRACE("Release completed.");
	StateChange(ZState::IDLE, ZOperation::RELEASE);
}

int ZPayloadFetcher::Process()
{
	switch (State)
	{
		case ZState::READY:
		pthread_create(&ProcessThreadId, NULL, ThreadProcess, this);
		StateChange(ZState::IN_PROGRESS, ZOperation::PROCESS_START);
		break;
		case ZState::IN_PROGRESS:
		TRACE("Payload fetcher is in progress, Process aborted!");
		return -1;
		default:
		TRACE("Invalid payload fetcher state, Process aborted, expected: [%s], actual: [%s]",
		       STATES[int(ZState::READY)].c_str(), STATES[int(State)].c_str());
		return -2;
		break;
	}
	return 0;
}

void ZPayloadFetcher::StateChange(ZState nextState, ZOperation operation)
{
	// Assert if the state change is invalid
	// ...

	TRACE("State change: [%s] - > [%s], operation: [%s]",
	       STATES[int(State)].c_str(),
               STATES[int(nextState)].c_str(),
               OPERATIONS[int(operation)].c_str());
	pthread_mutex_lock(&StateChangeMutex);
	State = nextState;
	pthread_mutex_unlock(&StateChangeMutex);
}

void ZPayloadFetcher::DoProcess()
{
	TRACE("Process start...");

	// Fetch process scenario
	// ...

	for (int i = 0; i < 10; i++) 	{
                Progress += 10;
                TRACE("Process in progress, [%2d%] completed", Progress);
   		sleep(1);
  	}
  	TRACE("Process complete");
  	StateChange(ZState::COMPLETED, ZOperation::PROCESS_COMPLETE);
  	Progress = 0;
  	return;
}

ZPayloadFetcher::~ZPayloadFetcher()  
{
  	if (ProcessThreadId)
        {
  		TRACE("Waiting for Process thread [0x%X] to complete...", ProcessThreadId);
  		pthread_join(ProcessThreadId, NULL);
  		TRACE("Thread [0x%X] has completed...", ProcessThreadId);
  	}
  	pthread_mutex_destroy(&StateChangeMutex);
} 

Then let us try with the input and output.

 Waiting for event.... [i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
Input a char to start event....
p
Event [p] will be executed
Input a char to start event....
[Payload Fetcher]	 Invalid payload fetcher state, Process aborted, expected: [READY], actual: [UNKNOWN]
Waiting for event....
[i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
i
Invalid input: [
]..
Input a char to start event....
Event [i] will be executed
Input a char to start event....
[Payload Fetcher]	 Initialization completed
[Payload Fetcher]	 State change: [UNKNOWN] - > [READY], operation: [INTIALIZE]
Waiting for event....
[i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
p
Invalid input: [
]..
Input a char to start event....
Event [p] will be executed
Input a char to start event....
[Payload Fetcher]	 State change: [READY] - > [IN_PROGRESS], operation: [PROCESS START]
Waiting for event....
[i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
[Payload Fetcher]	 Process start...
[Payload Fetcher]	 Process in progress, [10%] completed
[Payload Fetcher]	 Process in progress, [20%] completed
[Payload Fetcher]	 Process in progress, [30%] completed
[Payload Fetcher]	 Process in progress, [40%] completed
p
Invalid input: [
]..
Input a char to start event....
Event [p] will be executed
Input a char to start event....
[Payload Fetcher]	 Payload fetcher is in progress, Process aborted!
Waiting for event....
[i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
[Payload Fetcher]	 Process in progress, [50%] completed
[Payload Fetcher]	 Process in progress, [60%] completed
r[Payload Fetcher]	 Process in progress, [70%] completed

Invalid input: [
]..
Input a char to start event....
Event [r] will be executed
Input a char to start event....
[Payload Fetcher]	 Fetcher is in progress, Release aborted!
Waiting for event....
[i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
[Payload Fetcher]	 Process in progress, [80%] completed
[Payload Fetcher]	 Process in progress, [90%] completed
[Payload Fetcher]	 Process in progress, [100%] completed
[Payload Fetcher]	 Process complete
[Payload Fetcher]	 State change: [IN_PROGRESS] - > [COMPLETED], operation: [PROCESS COMPLETE]
r
Invalid input: [
]..
Input a char to start event....
Event [r] will be executed
Input a char to start event....
[Payload Fetcher]	 Waiting for Process thread [0x31D84700] to complete...
[Payload Fetcher]	 Thread [0x31D84700] has completed...
[Payload Fetcher]	 Release completed.
[Payload Fetcher]	 State change: [COMPLETED] - > [IDLE], operation: [RELEASE]
Waiting for event....
[i] -> Initialize payload fetcher
[r] -> Release payload fetcher
[p] -> Process payload fetcher
[x] -> Exit
p
Invalid input: [
]..
Input a char to start event....
Event [p] will be executed
Input a char to start event....
[Payload Fetcher]	 Invalid payload fetcher state, Process aborted, expected: [READY], actual: [IDLE]
Waiting for event....

Here we could have it verified that, from the payload fetcher point of view, there would be invalid request executing with the help of status machine, this works as what we expected.

3. Summay

I believe no matter how my dear programmer fresh, he knows status machine. The problem is in which scenario it benefits, and how. As talked above, if we have not realized the importance of state design, and just using flags on flags to implement the logic, code will be more and more hard to understand/debug, and there would be no one confident enough to clean up the code. What’s more, when there comes new requirements/scenario, it would be much more easier to extend, since we could just add some status/operations 🙂

Advertisements